/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.schema.tuple;

import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.phoenix.util.ScanUtil.isDummy;

import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.hbase.Cell;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;

/**
 * List implementation that provides indexed based look up when the cell column qualifiers are
 * positive numbers. These qualifiers are generated by using one of the column qualifier encoding
 * schemes specified in {@link QualifierEncodingScheme}. The api methods in this list assume that
 * the caller wants to see and add only non null elements in the list.
 * <p>
 * Please note that this implementation doesn't implement all the optional methods of the
 * {@link List} interface. Such unsupported methods could violate the basic invariance of the list
 * that every cell with an encoded column qualifier has a fixed position in the list.
 * </p>
 * <p>
 * An important performance characteristic of this list is that doing look up on the basis of index
 * via {@link #get(int)} is an O(n) operation. This makes iterating through the list using
 * {@link #get(int)} an O(n^2) operation. Instead, for iterating through the list, one should use
 * the iterators created through {@link #iterator()} or {@link #listIterator()}. Do note that
 * getting an element using {@link #getCellForColumnQualifier(byte[])} or
 * {@link #getCellForColumnQualifier(byte[], int, int)} is an O(1) operation and should generally be
 * the way for accessing elements in the list.
 * </p>
 */
@NotThreadSafe
public class EncodedColumnQualiferCellsList implements List<Cell> {

  private int minQualifier;
  private int maxQualifier;
  private int nonReservedRangeOffset;
  private final Cell[] array;
  private int numNonNullElements;
  private int firstNonNullElementIdx = -1;
  private static final int RESERVED_RANGE_SIZE =
    ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
  // Used by iterators to figure out if the list was structurally modified.
  private int modCount = 0;
  private final QualifierEncodingScheme encodingScheme;

  public EncodedColumnQualiferCellsList(int minQ, int maxQ,
    QualifierEncodingScheme encodingScheme) {
    checkArgument(minQ <= maxQ, "Invalid arguments. Min: " + minQ + ". Max: " + maxQ);
    this.minQualifier = minQ;
    this.maxQualifier = maxQ;
    int size = 0;
    if (maxQ < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
      size = RESERVED_RANGE_SIZE;
    } else if (minQ < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
      size = (maxQ - minQ + 1);
    } else {
      size = RESERVED_RANGE_SIZE + (maxQ - minQ + 1);
    }
    this.array = new Cell[size];
    this.nonReservedRangeOffset =
      minQ > ENCODED_CQ_COUNTER_INITIAL_VALUE ? minQ - ENCODED_CQ_COUNTER_INITIAL_VALUE : 0;
    this.encodingScheme = encodingScheme;
  }

  @Override
  public int size() {
    return numNonNullElements;
  }

  @Override
  public boolean isEmpty() {
    return numNonNullElements == 0;
  }

  @Override
  public boolean contains(Object o) {
    return indexOf(o) >= 0;
  }

  @Override
  public Object[] toArray() {
    Object[] toReturn = new Object[numNonNullElements];
    int counter = 0;
    if (numNonNullElements > 0) {
      for (int i = 0; i < array.length; i++) {
        if (array[i] != null) {
          toReturn[counter++] = array[i];
        }
      }
    }
    return toReturn;
  }

  @Override
  @SuppressWarnings("unchecked")
  public <T> T[] toArray(T[] a) {
    T[] toReturn = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(),
      numNonNullElements);
    int counter = 0;
    for (int i = 0; i < array.length; i++) {
      if (array[i] != null) {
        toReturn[counter++] = (T) array[i];
      }
    }
    return toReturn;
  }

  @Override
  public boolean add(Cell e) {
    if (e == null) {
      throw new NullPointerException();
    }
    if (isDummy(e)) {
      array[0] = e;
      firstNonNullElementIdx = 0;
      numNonNullElements = 1;
      return true;
    }
    int columnQualifier =
      encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());

    checkQualifierRange(columnQualifier);
    int idx = getArrayIndex(columnQualifier);
    if (array[idx] == null) {
      numNonNullElements++;
    }
    array[idx] = e;
    if (firstNonNullElementIdx == -1) {
      firstNonNullElementIdx = idx;
    } else if (idx < firstNonNullElementIdx) {
      firstNonNullElementIdx = idx;
    }
    modCount++;
    /*
     * Note that we don't care about equality of the element being added with the element already
     * present at the index.
     */
    return true;
  }

  @Override
  public boolean remove(Object o) {
    if (o == null) {
      return false;
    }
    Cell e = (Cell) o;
    int i = 0;
    while (i < array.length) {
      if (array[i] != null && array[i].equals(e)) {
        array[i] = null;
        numNonNullElements--;
        if (numNonNullElements == 0) {
          firstNonNullElementIdx = -1;
        } else if (firstNonNullElementIdx == i) {
          // the element being removed was the first non-null element we knew
          adjustFirstNonNullElement();
        }
        modCount++;
        return true;
      }
      i++;
    }
    return false;
  }

  @Override
  public boolean containsAll(Collection<?> c) {
    boolean containsAll = true;
    Iterator<?> itr = c.iterator();
    while (itr.hasNext()) {
      containsAll &= (indexOf(itr.next()) >= 0);
    }
    return containsAll;
  }

  @Override
  public boolean addAll(Collection<? extends Cell> c) {
    boolean changed = false;
    for (Cell cell : c) {
      if (c == null) {
        throw new NullPointerException();
      }
      changed |= add(cell);
    }
    return changed;
  }

  @Override
  public boolean addAll(int index, Collection<? extends Cell> c) {
    throwGenericUnsupportedOperationException();
    return false;
  }

  @Override
  public boolean removeAll(Collection<?> c) {
    Iterator<?> itr = c.iterator();
    boolean changed = false;
    while (itr.hasNext()) {
      changed |= remove(itr.next());
    }
    return changed;
  }

  @Override
  public boolean retainAll(Collection<?> collection) {
    boolean changed = false;
    // Optimize if the passed collection is an instance of EncodedColumnQualiferCellsList
    if (collection instanceof EncodedColumnQualiferCellsList) {
      EncodedColumnQualiferCellsList list = (EncodedColumnQualiferCellsList) collection;
      ListIterator<Cell> listItr = this.listIterator();
      while (listItr.hasNext()) {
        Cell cellInThis = listItr.next();
        int qualifier = encodingScheme.decode(cellInThis.getQualifierArray(),
          cellInThis.getQualifierOffset(), cellInThis.getQualifierLength());
        try {
          Cell cellInParam = list.getCellForColumnQualifier(qualifier);
          if (cellInParam != null && cellInParam.equals(cellInThis)) {
            continue;
          }
          listItr.remove();
          changed = true;
        } catch (IndexOutOfBoundsException expected) {
          // this could happen when the qualifier of cellInParam lies out of
          // the range of this list.
          listItr.remove();
          changed = true;
        }
      }
    } else {
      throw new UnsupportedOperationException(
        "Operation only supported for collections of type EncodedColumnQualiferCellsList");
    }
    return changed;
  }

  @Override
  public void clear() {
    for (int i = 0; i < array.length; i++) {
      array[i] = null;
    }
    firstNonNullElementIdx = -1;
    numNonNullElements = 0;
    modCount++;
  }

  @Override
  public Cell get(int index) {
    rangeCheck(index);
    int numNonNullElementsFound = 0;
    for (int i = firstNonNullElementIdx; i < array.length; i++) {
      if (array[i] != null) {
        numNonNullElementsFound++;
        if (numNonNullElementsFound == index + 1) {
          return array[i];
        }
      }
    }
    throw new IllegalStateException("There was no element present in the list at index " + index
      + " even though number of elements in the list are " + size());
  }

  @Override
  public Cell set(int index, Cell e) {
    throwGenericUnsupportedOperationException();
    return null;
  }

  @Override
  public void add(int index, Cell element) {
    throwGenericUnsupportedOperationException();
  }

  @Override
  public Cell remove(int index) {
    throwGenericUnsupportedOperationException();
    return null;
  }

  @Override
  public int indexOf(Object o) {
    if (o == null || isEmpty()) {
      return -1;
    } else {
      int numNonNull = -1;
      for (int i = 0; i < array.length; i++) {
        if (array[i] != null) {
          numNonNull++;
        }
        if (o.equals(array[i])) {
          return numNonNull;
        }
      }
    }
    return -1;
  }

  @Override
  public int lastIndexOf(Object o) {
    if (o == null || isEmpty()) {
      return -1;
    }
    int lastIndex = numNonNullElements;
    for (int i = array.length - 1; i >= 0; i--) {
      if (array[i] != null) {
        lastIndex--;
      }
      if (o.equals(array[i])) {
        return lastIndex;
      }
    }
    return -1;
  }

  @Override
  public ListIterator<Cell> listIterator() {
    return new ListItr();
  }

  @Override
  public ListIterator<Cell> listIterator(int index) {
    throwGenericUnsupportedOperationException();
    return null;
  }

  @Override
  public List<Cell> subList(int fromIndex, int toIndex) {
    throwGenericUnsupportedOperationException();
    return null;
  }

  @Override
  public Iterator<Cell> iterator() {
    return new Itr();
  }

  /**
   * @param qualifierBytes bytes of the column qualifier which serves as the index
   * @return {@link Cell} at the index
   */
  public Cell getCellForColumnQualifier(byte[] qualifierBytes) {
    int columnQualifier = encodingScheme.decode(qualifierBytes);
    return getCellForColumnQualifier(columnQualifier);
  }

  /**
   * @param qualifierBytes bytes of the column qualifier which serves as the index
   * @param offset         offset in the byte array
   * @param length         length starting from offset
   * @return {@link Cell} at the index
   */
  public Cell getCellForColumnQualifier(byte[] qualifierBytes, int offset, int length) {
    int columnQualifier = encodingScheme.decode(qualifierBytes, offset, length);
    return getCellForColumnQualifier(columnQualifier);
  }

  private void adjustFirstNonNullElement() {
    int i = firstNonNullElementIdx;
    while (i < array.length && (array[i]) == null) {
      i++;
    }
    if (i < array.length) {
      firstNonNullElementIdx = i;
    } else {
      firstNonNullElementIdx = -1;
    }

  }

  private Cell getCellForColumnQualifier(int columnQualifier) {
    checkQualifierRange(columnQualifier);
    int idx = getArrayIndex(columnQualifier);
    Cell c = array[idx];
    return c;
  }

  public Cell getFirstCell() {
    if (firstNonNullElementIdx == -1) {
      throw new NoSuchElementException("No elements present in the list");
    }
    return array[firstNonNullElementIdx];
  }

  private void checkQualifierRange(int qualifier) {
    if (qualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
      return; // space in the array for reserved range is always allocated.
    }
    if (qualifier < minQualifier || qualifier > maxQualifier) {
      throw new IndexOutOfBoundsException("Qualifier " + qualifier
        + " is out of the valid range - (" + minQualifier + ", " + maxQualifier + ")");
    }
  }

  private void rangeCheck(int index) {
    if (index < 0 || index >= size()) {
      throw new IndexOutOfBoundsException();
    }
  }

  private int getArrayIndex(int columnQualifier) {
    checkArgument(columnQualifier >= ENCODED_EMPTY_COLUMN_NAME);
    if (columnQualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE) {
      return columnQualifier;
    }
    return columnQualifier - nonReservedRangeOffset;
  }

  private void throwGenericUnsupportedOperationException() {
    throw new UnsupportedOperationException(
      "Operation cannot be supported because it potentially violates the invariance contract of this list implementation");
  }

  private class Itr implements Iterator<Cell> {
    protected int nextIndex = 0;
    protected int lastRet = -1;
    protected int expectedModCount = modCount;

    private Itr() {
      moveForward(true);
    }

    @Override
    public boolean hasNext() {
      return nextIndex != -1;
    }

    @Override
    public Cell next() {
      checkForCoModification();
      if (!hasNext()) {
        throw new NoSuchElementException();
      }
      Cell next = array[nextIndex];
      lastRet = nextIndex;
      moveForward(false);
      modCount++;
      expectedModCount = modCount;
      return next;
    }

    @Override
    public void remove() {
      if (lastRet < 0) {
        throw new IllegalStateException();
      }
      checkForCoModification();
      array[lastRet] = null;
      if (firstNonNullElementIdx == lastRet) {
        // the element being removed was the first non-null element we knew
        adjustFirstNonNullElement();
      }
      lastRet = -1;
      numNonNullElements--;
      modCount++;
      expectedModCount = modCount;
    }

    protected void moveForward(boolean init) {
      int i = init ? 0 : nextIndex + 1;
      while (i < array.length && (array[i]) == null) {
        i++;
      }
      if (i < array.length) {
        nextIndex = i;
      } else {
        nextIndex = -1;
      }
    }

    protected void checkForCoModification() {
      if (modCount != expectedModCount) {
        throw new ConcurrentModificationException();
      }
    }

  }

  private class ListItr extends Itr implements ListIterator<Cell> {
    private int previousIndex = -1;

    private ListItr() {
      moveForward(true);
    }

    @Override
    public boolean hasNext() {
      return nextIndex != -1;
    }

    @Override
    public boolean hasPrevious() {
      return previousIndex != -1;
    }

    @Override
    public Cell previous() {
      if (previousIndex == -1) {
        throw new NoSuchElementException();
      }
      checkForCoModification();
      lastRet = previousIndex;
      movePointersBackward();
      return array[lastRet];
    }

    @Override
    public int nextIndex() {
      return nextIndex;
    }

    @Override
    public int previousIndex() {
      return previousIndex;
    }

    @Override
    public void remove() {
      if (lastRet == nextIndex) {
        moveNextPointer(nextIndex);
      }
      super.remove();
      expectedModCount = modCount;
    }

    @Override
    public void set(Cell e) {
      if (lastRet == -1) {
        throw new IllegalStateException();
      }
      int columnQualifier = encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(),
        e.getQualifierLength());
      int idx = getArrayIndex(columnQualifier);
      if (idx != lastRet) {
        throw new IllegalArgumentException("Cell " + e + " with column qualifier " + columnQualifier
          + " belongs at index " + idx + ". It cannot be added at the position " + lastRet
          + " to which the previous next() or previous() was pointing to.");
      }
      EncodedColumnQualiferCellsList.this.add(e);
      expectedModCount = modCount;
    }

    @Override
    public void add(Cell e) {
      throwGenericUnsupportedOperationException();
    }

    @Override
    protected void moveForward(boolean init) {
      if (!init) {
        previousIndex = nextIndex;
      }
      int i = init ? 0 : nextIndex + 1;
      moveNextPointer(i);
    }

    private void moveNextPointer(int i) {
      while (i < array.length && (array[i]) == null) {
        i++;
      }
      if (i < array.length) {
        nextIndex = i;
      } else {
        nextIndex = -1;
      }
    }

    private void movePointersBackward() {
      nextIndex = previousIndex;
      int i = previousIndex - 1;
      movePreviousPointer(i);
    }

    private void movePreviousPointer(int i) {
      for (; i >= 0; i--) {
        if (array[i] != null) {
          previousIndex = i;
          break;
        }
      }
      if (i < 0) {
        previousIndex = -1;
      }
    }
  }

}
